{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# MapReduce\n",
    "\n",
    "**GOAL:** The goal of this exercise is to show how to implement a toy version of the MapReduce system on top of Ray.\n",
    "\n",
    "[MapReduce](https://en.wikipedia.org/wiki/MapReduce) is essentially a computational pattern for computing aggregate statistics of large datasets. It is the core primitive in systems like MapReduce, Hadoop, and Spark.\n",
    "\n",
    "At its core, MapReduce consists of two primitives:\n",
    "- The **map** transformation takes a dataset and a function and applies the function to each data point.\n",
    "- The **reduce** transformation aggregates the output of the map stage.\n",
    "\n",
    "For example, suppose that our starting point is a collection of documents. If we wish to count the number of occurrences of each word in the document, we can first apply a \"map\" transformation, which turns each document into a dictionary mapping words to the number of occurrences within that document. Then we can apply the \"reduce\" transformation, which sums the counts for each word."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from __future__ import absolute_import\n",
    "from __future__ import division\n",
    "from __future__ import print_function\n",
    "\n",
    "import numpy as np\n",
    "import ray\n",
    "import time"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ray.init(num_cpus=4, include_webui=False, ignore_reinit_error=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Below is a serial implementation of the **map** function. Note that Python already has a [built-in map function](https://docs.python.org/2/library/functions.html#map)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def map_serial(function, xs):\n",
    "    return [function(x) for x in xs]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**EXERCISE:** Implement a parallel version of the map function. You'll need to modify the function below.\n",
    "\n",
    "**NOTE:** Because we want `map_parallel` to be non-blocking, the function signature for `map_parallel` should be different from the signature of `map_serial`.\n",
    "- The argument `function` must be a Ray remote function instead of a regular Python function.\n",
    "- The return value should be a list of Ray `ObjectID`s instead of a list of the actual transformed values (so that `map_parallel` can return immediately)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def map_parallel(function, xs):\n",
    "    \"\"\"Apply a remote function to each element of a list.\"\"\"\n",
    "    if not isinstance(xs, list):\n",
    "        raise ValueError('The xs argument must be a list.')\n",
    "    \n",
    "    if not hasattr(function, 'remote'):\n",
    "        raise ValueError('The function argument must be a remote function.')\n",
    "\n",
    "    # EXERCISE: Modify the list comprehension below to invoke \"function\"\n",
    "    # remotely on each element of \"xs\". This should essentially submit\n",
    "    # one remote task for each element of the list and then return the\n",
    "    # resulting list of ObjectIDs.\n",
    "    return [function(x) for x in xs]\n",
    "\n",
    "\n",
    "# ***** Do not change the code below! It verifies that \n",
    "# ***** the exercise has been done correctly. *****\n",
    "\n",
    "def increment_regular(x):\n",
    "    return x + 1\n",
    "\n",
    "\n",
    "@ray.remote\n",
    "def increment_remote(x):\n",
    "    return x + 1\n",
    "\n",
    "\n",
    "xs = [1, 2, 3, 4, 5]\n",
    "result_ids = map_parallel(increment_remote, xs)\n",
    "assert isinstance(result_ids, list), 'The output of \"map_parallel\" must be a list.'\n",
    "assert all([isinstance(x, ray.ObjectID) for x in result_ids]), 'The output of map_parallel must be a list of ObjectIDs.'\n",
    "assert ray.get(result_ids) == map_serial(increment_regular, xs)\n",
    "print('Congratulations, the test passed!')    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**EXERCISE:** Run the cell below and verify that `parallel_map` runs instantaneously and that fetching the result takes the expected amount of time for a simple task that sleeps."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def sleep_regular(x):\n",
    "    time.sleep(1)\n",
    "    return x + 1\n",
    "\n",
    "\n",
    "@ray.remote\n",
    "def sleep_remote(x):\n",
    "    time.sleep(1)\n",
    "    return x + 1\n",
    "\n",
    "\n",
    "# Regular sleep should take 4 seconds.\n",
    "print('map_serial:')\n",
    "%time results_serial = map_serial(sleep_regular, [1, 2, 3, 4])\n",
    "\n",
    "# Initiaing the map_parallel should be instantaneous.\n",
    "print('\\ncalling map_parallel:')\n",
    "%time result_ids = map_parallel(sleep_remote, [1, 2, 3, 4])\n",
    "\n",
    "# Fetching the results from map_parallel should take 1 second\n",
    "# (since we started Ray with num_cpus=4).\n",
    "print('\\ngetting results from map_parallel:')\n",
    "%time results_parallel = ray.get(result_ids)\n",
    "\n",
    "assert results_parallel == results_serial"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**EXERCISE:** Take a look at the task timeline and verify that the four map tasks executed in parallel. Do this by running the next cell and clicking *\"View task timeline\"*.\n",
    "\n",
    "**NOTE:** This will show all tasks that have been executed since `ray.init()` was called, which may be a lot.\n",
    "\n",
    "To navigate the timeline:\n",
    "- Click and drag to move.\n",
    "- Scroll to zoom."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import ray.experimental.ui as ui\n",
    "ui.task_timeline()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Below is a serial implementation of a simple **reduce** function. Note that Python has a [built-in reduce function](https://docs.python.org/2/library/functions.html#reduce).\n",
    "\n",
    "The reduce function essentially aggregates all of the elements in a list (e.g., by summing them together)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def reduce_serial(function, xs):\n",
    "    if len(xs) == 1:\n",
    "        return xs[0]\n",
    "    \n",
    "    result = xs[0]\n",
    "    for i in range(1, len(xs)):\n",
    "        result = function(result, xs[i])\n",
    "\n",
    "    return result\n",
    "\n",
    "\n",
    "def add_regular(x, y):\n",
    "    time.sleep(0.3)\n",
    "    return x + y\n",
    "\n",
    "\n",
    "assert reduce_serial(add_regular, [1, 2, 3, 4, 5, 6, 7, 8]) == 36"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**EXERCISE:** Implement `reduce_parallel` below by modifying the serial reduce implementation to simply invoke `function` remotely (via `function.remote`).\n",
    "\n",
    "Note that the underlying assumption here is that **function** is commutative and associative. That is, it shouldn't matter what order the elements are aggregated in (this is necessary for achieving parallelism)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def reduce_parallel(function, xs):\n",
    "    if not isinstance(xs, list):\n",
    "        raise ValueError('The xs argument must be a list.')\n",
    "\n",
    "    if not hasattr(function, 'remote'):\n",
    "        raise ValueError('The function argument must be a remote function.')\n",
    "\n",
    "    if len(xs) == 1:\n",
    "        return xs[0]\n",
    "\n",
    "    result = xs[0]\n",
    "    for i in range(1, len(xs)):\n",
    "        result = function(result, xs[i])\n",
    "\n",
    "    return result\n",
    "\n",
    "\n",
    "@ray.remote\n",
    "def add_remote(x, y):\n",
    "    time.sleep(0.3)\n",
    "    return x + y\n",
    "\n",
    "\n",
    "xs = [1, 2, 3, 4, 5, 6, 7, 8]\n",
    "result_id = reduce_parallel(add_remote, xs)\n",
    "assert ray.get(result_id) == reduce_serial(add_regular, xs)\n",
    "print('Congratulations, the test passed!')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**EXERCISE:** Take a look at the timeline for the above computation graph from the call to `reduce_parallel`. Is there any parallelism?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import ray.experimental.ui as ui\n",
    "ui.task_timeline()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "There is no parallelism above. That is because the tasks were executed in a for loop, and the output of each task was passed as a dependency to the subsequent task! In order to expose more parallelism, we need to reduce the elements in a different order.\n",
    "\n",
    "To illustrate the issue, note that we cannot execute the calls to `f` in parallel in\n",
    "\n",
    "```python\n",
    "f(f(f(f(f(f(f(1, 2), 3), 4), 5), 6), 7), 8)\n",
    "```\n",
    "\n",
    "which is what the above implementation does. However, we can execute some of the calls to `f` in parallel in\n",
    "\n",
    "```python\n",
    "f(f(f(1, 2), f(3, 4)), f(f(5, 6), f(7, 8)))\n",
    "```\n",
    "\n",
    "which is what a **tree reduce** does. Note that the two computations above are not equivalent in general, however, when `f` is associative and commutative, they are equivalent.\n",
    "\n",
    "**EXERCISE:** Modify the reduce implementation to expose more parallelism."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def reduce_parallel_tree(function, xs):\n",
    "    if not isinstance(xs, list):\n",
    "        raise ValueError('The xs argument must be a list.')\n",
    "    \n",
    "    if not hasattr(function, 'remote'):\n",
    "        raise ValueError('The function argument must be a remote function.')\n",
    "\n",
    "    # The easiest way to implement this function is to simply invoke\n",
    "    # \"function\" remotely on the first two elements of \"xs\" and to append\n",
    "    # the result to the end of \"xs\". Then repeat until there is only one\n",
    "    # element left in \"xs\" and return that element.\n",
    "\n",
    "    # EXERCISE: Think about why that exposes more parallelism.\n",
    "\n",
    "    raise NotImplementedError\n",
    "\n",
    "\n",
    "xs = [1, 2, 3, 4, 5, 6, 7, 8]\n",
    "result_id = reduce_parallel_tree(add_remote, xs)\n",
    "assert ray.get(result_id) == reduce_serial(add_regular, xs)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**EXERCISE:** Take a look at the timeline and see if the tasks for `reduce_parallel_tree` were executed in parallel."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import ray.experimental.ui as ui\n",
    "ui.task_timeline()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**EXERCISE:** Run the cell below and verify that `reduce_parallel_tree` runs instantaneously and that fetching the result takes the expected amount of time.\n",
    "\n",
    "Each task takes 0.3 seconds, so both `reduce_serial` and `reduce_parallel` should take `8 * 0.3 = 2.4` seconds. The tree should have depth 3, so `reduce_parallel_tree` should take about `3 * 0.3 = 0.9` seconds."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Regular sleep should take 4 seconds.\n",
    "print('reduce_serial:')\n",
    "%time results_serial = reduce_serial(add_regular, [1, 2, 3, 4, 5, 6, 7, 8])\n",
    "\n",
    "# Initiaing the map_parallel should be instantaneous.\n",
    "print('\\ncalling reduce_parallel:')\n",
    "%time result_ids = reduce_parallel(add_remote, [1, 2, 3, 4, 5, 6, 7, 8])\n",
    "\n",
    "# Fetching the results from map_parallel should take 1 second\n",
    "# (since we started Ray with num_cpus=4).\n",
    "print('\\ngetting results from reduce_parallel:')\n",
    "%time results_parallel = ray.get(result_ids)\n",
    "\n",
    "assert results_parallel == results_serial\n",
    "\n",
    "# Initiaing the map_parallel should be instantaneous.\n",
    "print('\\ncalling reduce_parallel_tree:')\n",
    "%time result_tree_ids = reduce_parallel_tree(add_remote, [1, 2, 3, 4, 5, 6, 7, 8])\n",
    "\n",
    "# Fetching the results from map_parallel should take 1 second\n",
    "# (since we started Ray with num_cpus=4).\n",
    "print('\\ngetting results from reduce_parallel_tree:')\n",
    "%time results_parallel_tree = ray.get(result_tree_ids)\n",
    "\n",
    "assert results_parallel_tree == results_serial"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
